package com.atguigu.flink.watermark;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/7
 *

    从源头产生水印

 */
public class Demo5_CustomWaterMarkGenerate
{
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                 new TextLineInputFormat(),
                                                  new Path("data/ws.json"))
                                             .build();

         WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
                     .<String>forGenerator(new WatermarkGeneratorSupplier<String>()
                     {
                         //返回一个WatermarkGenerator(水印生成者)对象
                         @Override
                         public WatermarkGenerator<String> createWatermarkGenerator(Context context) {
                             return new MyGenerator(1000);
                         }
                     })
                     .withTimestampAssigner(new SerializableTimestampAssigner<String>()
                     {
                         @Override
                         public long extractTimestamp(String element, long recordTimestamp) {
                             return JSON.parseObject(element).getLong("ts");
                         }
                     });

        environment
            //用source读数据时，直接生成水印
            .fromSource(source, watermarkStrategy, "wc")
            .print();

        environment.execute();



    }

    /*
        保证：
          1.水印就是时间，应该单调递增
          2.水印有意义 ，水印 <= 数据的事件时间，一旦超过数据的事件时间，造成数据大面积迟到

     */
    private static class MyGenerator implements WatermarkGenerator<String>{

        public MyGenerator(long delay){
            maxTimeStamp = Long.MIN_VALUE + 1 + delay;
            this.delay = delay;
        }
        //维护一个属性，记录收到的最大的时间戳
        private long maxTimeStamp ;
        private long delay ;
        //断点式发送
        @Override
        public void onEvent(String event, long eventTimestamp, WatermarkOutput output) {
            //和数据的事件时间对比，更新最大时间戳
            maxTimeStamp = Math.max(maxTimeStamp,eventTimestamp);

            Watermark watermark = new Watermark(maxTimeStamp - 1 - delay );
            System.out.println("向下游发送了:"+watermark.getTimestamp());
            output.emitWatermark(watermark);
        }

        //周期性发送水印
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
           /* Watermark watermark = new Watermark(maxTimeStamp - 1 - delay );
            System.out.println("向下游发送了:"+watermark.getTimestamp());
            output.emitWatermark(watermark);*/

        }
    }
}
